package net.chengp.esload4j.source.mysql;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import lombok.extern.slf4j.Slf4j;
import net.chengp.esload4j.config.ESLoad4jConfig.MysqlSourceConfig;
import net.chengp.esload4j.source.mysql.db.MySQLDaoImpl;
import net.chengp.esload4j.target.es.ESLoad;

/**
 * 
 * @author chengp
 * @email E-mail:chengp@chengp.net
 * @version 创建时间：2020年9月12日 下午3:23:52 
 *
 */
@Slf4j
public class MySQLToESDataLoadThread implements Runnable{
	
	public static volatile AtomicLong count = new AtomicLong();
	
	private MysqlSourceConfig mysqlSourceConfig;

	private ESLoad ESLoad;
	
	private String tableName;
	
	private String indexName;
	
	private long start;
	
	private long size;
	
	public MySQLToESDataLoadThread(MysqlSourceConfig mysqlSourceConfig, ESLoad ESLoad, String tableName, String indexName, long start, long size) {
		this.mysqlSourceConfig = mysqlSourceConfig;
		this.ESLoad = ESLoad;
		this.tableName = tableName;
		this.indexName = indexName;
		this.start = start;
		this.size = size;
	}

	@Override
	public void run() {
		List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
		try {
			result = new MySQLDaoImpl(mysqlSourceConfig).getTableDatasPage(tableName, start, size);
			ESLoad.bulkPutIndex(indexName, result);
		}catch (Exception e) {
			count.getAndSet(Integer.MIN_VALUE);
			log.error("加载数据至ES失败.", e);
		}finally {
			count.getAndAdd(result.size());
		}
	}

}
